package com.atguigu.flink.state.operatorstate;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

/**
 * Created by Smexy on 2023/11/17
 */
public class Demo3_BroadcastState
{
    public static void main(String[] args) {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //数据流
        SingleOutputStreamOperator<WaterSensor> ds1 = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction());

        //配置流
        SingleOutputStreamOperator<MyConf> ds2 = env
            .socketTextStream("hadoop102", 8889)
            .map(new MapFunction<String, MyConf>()
            {
                @Override
                public MyConf map(String value) throws Exception {
                    String[] words = value.split(",");
                    return new MyConf(
                        words[0],
                        words[1]
                    );
                }
            });

        MapStateDescriptor<String, MyConf> mapStateDescriptor = new MapStateDescriptor<>("conf", String.class, MyConf.class);
        //把配置流做成广播流，才能使用广播状态
        BroadcastStream<MyConf> broadcastStream = ds2.broadcast(mapStateDescriptor);

        //连个流连接
        BroadcastConnectedStream<WaterSensor, MyConf> connectedStream = ds1.connect(broadcastStream);

        connectedStream
            .process(new BroadcastProcessFunction<WaterSensor, MyConf, WaterSensor>()
            {
                //有了广播状态，无需这么繁琐
                //private MyConf conf;
                //WaterSensor value 需要读取 最新的 MyConf value，修改自己的配置，再输出
                @Override
                public void processElement(WaterSensor value, ReadOnlyContext ctx, Collector<WaterSensor> out) throws Exception {
                    ReadOnlyBroadcastState<String, MyConf> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
                    //读取广播状态，获取交互的信息
                    MyConf conf = broadcastState.get(value.getId());
                    value.setId(conf.getName());
                    out.collect(value);
                }

                @Override
                public void processBroadcastElement(MyConf value, Context ctx, Collector<WaterSensor> out) throws Exception {
                    //把要交换的信息，写入广播状态
                    //当map用
                    BroadcastState<String, MyConf> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
                    //自动存储到BroadcastProcessFunction的状态中
                    broadcastState.put(value.id, value);

                }
            })
            .print();


        try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    private static class MyConf{
        private String id;
        private String name;
    }
}
